DynamoDBの変更履歴をKinesis Firehose経由でS3に保存してAthenaで見る
不具合の調査などをしているとき、「あのタイミングのDynamoDBテーブルのデータが分かれば助かるのに……」と思ったことは無いでしょうか? 私はあります。 Lambdaで参照したDynamoDBテーブルのログデータを見ても良いのですが、下記のように影響範囲を調査する際は有効ではありません。(ログを調べれば分かるけど大変)
- この値が原因なのは分かった。いつからこの値になっていた……?
というわけで本記事では、DynamoDBテーブルの変更履歴をLambdaとKinesis Firehoseを経由してS3に格納し、Athenaで参照する仕組みを構築してみました。Partition Projectionを使って、Athenaのパーティション更新を自動化しています。
おすすめの方
- AWS SAMを使ってみたい方
- DynamoDB Streamの情報をS3保存したい方
- Kinesis FirehoseがS3に保存したデータをAthenaで見たい方
- DynamoDBの変更履歴をAthenaで見たい方
- Athenaのパーティション更新を自動化したい方
DynamoDBとLambdaとS3を作る
AWS SAMを使って作ります。
SAM Init
sam init \ --runtime python3.7 \ --name DynamoDB-Update-Item-Athena \ --app-template hello-world
SAMテンプレート
DynamoDB・Lambda・Kinesis Firehose・S3バケットを定義しています。
AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: DynamoDB-Update-Item-Athena Resources: # 変更履歴を残したいDynamoDBテーブル TodoTable: Type: AWS::DynamoDB::Table Properties: TableName: todo-sample-table AttributeDefinitions: - AttributeName: todoId AttributeType: S KeySchema: - AttributeName: todoId KeyType: HASH BillingMode: PAY_PER_REQUEST StreamSpecification: StreamViewType: NEW_AND_OLD_IMAGES # DynamoDB Streamsで起動してKinesis FirehoseにPutするLambda ToFirehoseFunction: Type: AWS::Serverless::Function Properties: CodeUri: hello_world/ Handler: app.lambda_handler Runtime: python3.7 Timeout: 30 Environment: Variables: DELIVERY_STREAM_NAME: !Ref TodoTableUpdateDeliveryStream Policies: - arn:aws:iam::aws:policy/AmazonKinesisFirehoseFullAccess Events: DynamoDBStream: Type: DynamoDB Properties: Stream: !GetAtt TodoTable.StreamArn BatchSize: 100 StartingPosition: TRIM_HORIZON BisectBatchOnFunctionError: true ToFirehoseFunctionLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub /aws/lambda/${ToFirehoseFunction} # DynamoDBの変更履歴を保存するS3バケット(Athena検索対象) TodoTableUpdateBucket: Type: AWS::S3::Bucket DeletionPolicy: Retain Properties: AccessControl: Private BucketName: cm-fujii.genki-dynamodb-update-bucket # S3バケットに格納するKinesis Firehose TodoTableUpdateDeliveryStream: Type: AWS::KinesisFirehose::DeliveryStream Properties: ExtendedS3DestinationConfiguration: BucketARN: !GetAtt TodoTableUpdateBucket.Arn BufferingHints: IntervalInSeconds: 60 SizeInMBs: 50 CompressionFormat: UNCOMPRESSED Prefix: dynamodb_item/ ErrorOutputPrefix: error/ RoleARN: !GetAtt TodoTableUpdateDeliveryStreamRole.Arn TodoTableUpdateDeliveryStreamRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: firehose.amazonaws.com Action: sts:AssumeRole Policies: - PolicyName: "todo-table-update-firehose-delivery-policy" PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - s3:AbortMultipartUpload - s3:GetBucketLocation - s3:GetObject - s3:ListBucket - s3:ListBucketMultipartUploads - s3:PutObject Resource: !Sub "${TodoTableUpdateBucket.Arn}/*"
なお、Kinesis FirehoseのIntervalInSeconds
は実験用として短い60秒にしています。
実際に運用する場合はDynamoDBの変更頻度を考慮しつつ、下記を参考にデータ量・データ数・圧縮の有無を調整してみてください。
Lambdaコード
DynamoDB Streams経由で起動し、変更前と変更後のデータを保存しています。 このLambda関数では、DynamoDBにどのようなデータがあるのか?を意識していません。そのため、DynamoDBの項目を変更しても、このLambda自体の修正は不要です。
import json import os import boto3 from datetime import datetime from boto3.dynamodb.types import TypeDeserializer firehose = boto3.client('firehose') dynamodb_deserializer = TypeDeserializer() def lambda_handler(event, context): for item in event['Records']: timestamp = int(item['dynamodb']['ApproximateCreationDateTime']) to_firehose_item = { 'action': item['eventName'], 'keys': deserialize(item['dynamodb']['Keys']), 'new_image': None, 'old_image': None, 'timestamp': timestamp, 'timestamp_utc': datetime.utcfromtimestamp(timestamp) } if 'NewImage' in item['dynamodb']: to_firehose_item['new_image'] = deserialize(item['dynamodb']['NewImage']) if 'OldImage' in item['dynamodb']: to_firehose_item['old_image'] = deserialize(item['dynamodb']['OldImage']) firehose.put_record( DeliveryStreamName=os.environ['DELIVERY_STREAM_NAME'], Record={ 'Data': json.dumps(to_firehose_item, default=json_default) + '\n' } ) def deserialize(item): """ {'fooId': {'S': 'bar1234'}} を {'fooId': 'bar1234'} に変換する """ d = {} for key in item: d[key] = dynamodb_deserializer.deserialize(item[key]) return d def json_default(obj): try: return str(obj) except Exception as e: print(e) raise TypeError('Failed convert to str.')
デプロイする
sam build sam package \ --output-template-file packaged.yaml \ --s3-bucket cm-fujii.genki-deploy sam deploy \ --template-file packaged.yaml \ --stack-name DynamoDB-Update-Item-Athena-Stack \ --capabilities CAPABILITY_NAMED_IAM \ --no-fail-on-empty-changeset
DynamoDBテーブルを変更してみる
データ追加
ためしに2件のデータを追加します。
{ "todoId": "t0001" }
{ "todoId": "t2222", "title": "iPhone 12 miniを買う" }
データ変更
さきほどのデータを変更します。
{ "todoId": "t0001", "title": "いますぐ寝る!!!", "done": false }
{ "todoId": "t2222", "title": "iPhone 12 miniを予約する", "done": false }
データ削除
todoId:t0001
のデータを削除します。
S3に格納された様子
2020年10月26日分として、バッチリ保存されています。
以下はデータ追加した際のファイルの内容です(一部抜粋)。
{"action": "INSERT", "keys": {"todoId": "t0001"}, "new_image": {"todoId": "t0001"}, "old_image": null, "timestamp": 1603679304, "timestamp_utc": "2020-10-26 02:28:24"}
Amazon Athenaでデータベースとテーブルを作成する
Kinesis FirehoseがS3バケットに格納したJSONファイルを見るためのAthenaを作成していきます。 せっかくなのでPartition Projectionを使って、パーティション更新を自動化します。
データベースの作成
下記でデータベースを作成します。
CREATE DATABASE dynamodb_update_database
テーブルの作成
下記でテーブルを作成します。
CREATE EXTERNAL TABLE IF NOT EXISTS dynamodb_update_database.todo_item_update_table ( `action` string, `keys` struct<todoid:string>, `timestamp` bigint, `timestamp_utc` timestamp, `new_image` struct<todoid:string, title:string, done:boolean>, `old_image` struct<todoid:string, title:string, done:boolean> ) PARTITIONED BY ( `dateday` string ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' WITH SERDEPROPERTIES ( 'serialization.format' = '1' ) LOCATION 's3://cm-fujii.genki-dynamodb-update-bucket/dynamodb_item/' TBLPROPERTIES ( 'has_encrypted_data'='false', 'projection.enabled' = 'true', 'projection.dateday.type' = 'date', 'projection.dateday.range' = '2020/10/01,NOW', 'projection.dateday.format' = 'yyyy/MM/dd', 'projection.dateday.interval' = '1', 'projection.dateday.interval.unit' = 'DAYS', 'storage.location.template' = 's3://cm-fujii.genki-dynamodb-update-bucket/dynamodb_item/${dateday}' );
できました!
試しにデータを確認する
この時点でお試し確認してみます。
10件取得
試しに下記クエリを実行します。
SELECT * FROM "dynamodb_update_database"."todo_item_update_table" WHERE dateday = '2020/10/26' ORDER BY timestamp LIMIT 10;
しっかり確認できました!
特定のデータを確認
todoid:t0001
のデータのみを表示します。
SELECT * FROM "dynamodb_update_database"."todo_item_update_table" WHERE dateday = '2020/10/26' AND keys.todoid = 't0001' ORDER BY timestamp LIMIT 10;
最初(INSERT)のデータを確認
action:INSERT
のデータのみを表示します。
SELECT * FROM "dynamodb_update_database"."todo_item_update_table" WHERE dateday = '2020/10/26' AND action = 'INSERT' ORDER BY timestamp LIMIT 10;
タイトルに特定文字を含む
new_image
のtitle
にiPhone
を含むデータのみを表示します。
SELECT * FROM "dynamodb_update_database"."todo_item_update_table" WHERE dateday = '2020/10/26' AND new_image.title LIKE '%iPhone%' ORDER BY timestamp LIMIT 10;
Amazon Athenaのパーティション自動更新を確認する
テーブル作成時、Partition Projectionを使って、dateday
のパーティションを作成しました。本来であれば、GlueやLambda等でパーティションを更新する必要がありますが、Partition Projectionを使うことで自動化しています。本当に自動化されているのかを確認します。
1日待ってDynamoDBテーブルを変更する
1日待ってからDynamoDBテーブルに1件のデータを追加し、1件のデータを変更します。
データ追加
{ "todoId": "t8888", "title": "お米を買う", "done": false }
データ変更
{ "todoId": "t2222", "title": "iPhone 12 Pro Maxを予約する", "done": false }
S3バケットの様子
2020年10月27日分として格納されました。
内容もバッチリです。
{"action": "INSERT", "keys": {"todoId": "t8888"}, "new_image": {"title": "\u304a\u7c73\u3092\u8cb7\u3046", "todoId": "t8888", "done": false}, "old_image": null, "timestamp": 1603763256, "timestamp_utc": "2020-10-27 01:47:36"} {"action": "MODIFY", "keys": {"todoId": "t2222"}, "new_image": {"title": "iPhone 12 Pro Max\u3092\u4e88\u7d04\u3059\u308b", "todoId": "t2222", "done": false}, "old_image": {"title": "iPhone 12 mini\u3092\u4e88\u7d04\u3059\u308b", "todoId": "t2222", "done": false}, "timestamp": 1603763317, "timestamp_utc": "2020-10-27 01:48:37"}
Amazon Athenaで確認する
試しに下記クエリを実行します。
SELECT * FROM "dynamodb_update_database"."todo_item_update_table" WHERE dateday = '2020/10/27' ORDER BY timestamp LIMIT 10;
GlueやLambdaでパーティション更新はしていませんが、、Partition Projectionのおかげでしっかり確認できました!
DynamoDBテーブルの項目を増やしてみる
項目にmemoを追加
DynamoDBテーブルの新しい項目としてmemo
を追加します。
{ "todoId": "t2222", "title": "iPhone 12 Pro Maxを予約する", "done": false, "memo": "Apple Storeオンラインで予約する" }
{ "todoId": "t8888", "title": "お米を買う", "done": false, "memo": "あきたこまち!" }
このままだとAmazon Athenaで確認できない
S3バケットのデータにはmemo
が含まれていますが、Amazon Athenaでmemo
は確認できません。
なぜなら、テーブル作成時の定義にmemo
が含まれていないからです。
{"action": "MODIFY", "keys": {"todoId": "t2222"}, "new_image": {"memo": "Apple Store\u30aa\u30f3\u30e9\u30a4\u30f3\u3067\u4e88\u7d04\u3059\u308b", "title": "iPhone 12 Pro Max\u3092\u4e88\u7d04\u3059\u308b", "todoId": "t2222", "done": false}, "old_image": {"title": "iPhone 12 Pro Max\u3092\u4e88\u7d04\u3059\u308b", "todoId": "t2222", "done": false}, "timestamp": 1603763890, "timestamp_utc": "2020-10-27 01:58:10"} {"action": "MODIFY", "keys": {"todoId": "t8888"}, "new_image": {"memo": "\u3042\u304d\u305f\u3053\u307e\u3061\uff01", "title": "\u304a\u7c73\u3092\u8cb7\u3046", "todoId": "t8888", "done": false}, "old_image": {"title": "\u304a\u7c73\u3092\u8cb7\u3046", "todoId": "t8888", "done": false}, "timestamp": 1603764087, "timestamp_utc": "2020-10-27 02:01:27"}
SELECT * FROM "dynamodb_update_database"."todo_item_update_table" WHERE dateday = '2020/10/27' ORDER BY timestamp LIMIT 10;
Amazon Athenaのテーブルを再作成する
Amazon Atenaのテーブルを削除し、memo
を含めたテーブル定義を作成します。
テーブルの削除
DROP TABLE `todo_item_update_table`;
memoを含めたテーブルを作成
new_image
とold_image
のstruct
にmemo:string
を追加しています。
CREATE EXTERNAL TABLE IF NOT EXISTS dynamodb_update_database.todo_item_update_table ( `action` string, `keys` struct<todoid:string>, `timestamp` bigint, `timestamp_utc` timestamp, `new_image` struct<todoid:string, title:string, done:boolean, memo:string>, `old_image` struct<todoid:string, title:string, done:boolean, memo:string> ) PARTITIONED BY ( `dateday` string ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' WITH SERDEPROPERTIES ( 'serialization.format' = '1' ) LOCATION 's3://cm-fujii.genki-dynamodb-update-bucket/dynamodb_item/' TBLPROPERTIES ( 'has_encrypted_data'='false', 'projection.enabled' = 'true', 'projection.dateday.type' = 'date', 'projection.dateday.range' = '2020/10/01,NOW', 'projection.dateday.format' = 'yyyy/MM/dd', 'projection.dateday.interval' = '1', 'projection.dateday.interval.unit' = 'DAYS', 'storage.location.template' = 's3://cm-fujii.genki-dynamodb-update-bucket/dynamodb_item/${dateday}' );
Amazon Athenaでデータを確認する
SELECT * FROM "dynamodb_update_database"."todo_item_update_table" WHERE dateday = '2020/10/27' ORDER BY timestamp LIMIT 10;
memo
が確認できました!
さいごに
DynamoDBのデータを更新したとき、DynamoDb Streams・Lambda・Kinesis Firehoseを経由してS3にファイルを格納し、Amazon Athenaで見る仕組みを作ってみました。 Kinesis FirehoseがS3に出力したデータをAmazon Athenaで見る際の参考にもなると思います。
参考
- DynamoDB ストリームから Lambda 関数を呼び出して Kinesis Firehose を経由して S3 バケットに出力してみた | Developers.IO
- [新機能]Amazon Athena ルールベースでパーティションプルーニングを自動化する Partition Projection の徹底解説 | Developers.IO
- Amazon Athena を使用したパーティション射影 - Amazon Athena
- Amazon Kinesis Data Firehose 例 - Amazon Athena
- DynamoDB - AWS Serverless Application Model
- AWS::KinesisFirehose::DeliveryStream - AWS CloudFormation
- Firehose — Boto3 Docs 1.16.3 documentation
- boto3.dynamodb.types — Boto3 Docs 1.16.3 documentation
- パーティション射影のサポートされている型 - Amazon Athena
- Amazon Kinesis Data Firehose 例 - Amazon Athena
- [新機能]Amazon Athena ルールベースでパーティションプルーニングを自動化する Partition Projection の徹底解説 | Developers.IO
- Amazon Athenaを使ってJSONファイルを検索してみる | Developers.IO
- Amazon Athena Nested-JSONのSESログファイルを検索する | Developers.IO
- Amazon Athena のパフォーマンスチューニング Tips トップ 10 | Amazon Web Services ブログ
- DDL ステートメント - Amazon Athena